Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Mar 1, 2021

What changes were proposed in this pull request?

Push down limit through Window when the partitionSpec of all window functions is empty and the same order is used. This is a real case from production:

image

This pr support 2 cases:

  1. All window functions have same orderSpec:
    SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn, RANK() OVER(ORDER BY a) AS rk FROM t1 LIMIT 5;
    == Optimized Logical Plan ==
    Window [row_number() windowspecdefinition(a#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame,          unboundedpreceding$(), currentrow$())) AS rn#4, rank(a#9L) windowspecdefinition(a#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#5], [a#9L ASC NULLS FIRST]
    +- GlobalLimit 5
       +- LocalLimit 5
          +- Sort [a#9L ASC NULLS FIRST], true
             +- Relation default.t1[A#9L,B#10L,C#11L] parquet
  2. There is a window function with a different orderSpec:
    SELECT a, ROW_NUMBER() OVER(ORDER BY a) AS rn, RANK() OVER(ORDER BY b DESC) AS rk FROM t1 LIMIT 5;
    == Optimized Logical Plan ==
    Project [a#9L, rn#4, rk#5]
    +- Window [rank(b#10L) windowspecdefinition(b#10L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#5], [b#10L DESC NULLS LAST]
       +- GlobalLimit 5
          +- LocalLimit 5
             +- Sort [b#10L DESC NULLS LAST], true
                +- Window [row_number() windowspecdefinition(a#9L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#4], [a#9L ASC NULLS FIRST]
                   +- Project [a#9L, b#10L]
                      +- Relation default.t1[A#9L,B#10L,C#11L] parquet

Why are the changes needed?

Improve query performance.

spark.range(500000000L).selectExpr("id AS a", "id AS b").write.saveAsTable("t1")
spark.sql("SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rowId FROM t1 LIMIT 5").show
Before this pr After this pr
image image

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@github-actions github-actions bot added the SQL label Mar 1, 2021
@SparkQA
Copy link

SparkQA commented Mar 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40160/

@SparkQA
Copy link

SparkQA commented Mar 1, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40160/

@SparkQA
Copy link

SparkQA commented Mar 1, 2021

Test build #135579 has finished for PR 31691 at commit d04b678.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2021

Test build #135590 has finished for PR 31691 at commit 512dd54.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

WindowSpecDefinition(Nil, orderSpec,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _)), _, _, child))
if child.maxRows.forall( _ > limitVal) =>
LocalLimit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need the LocalLimit here? We already restrict the window expression to be RankLike and RowNumber, so we know the number of rows will not change before & after window, right?

Copy link
Member Author

@wangyum wangyum Mar 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be optimized by EliminateLimits later. Otherwise, the plan cannot be further optimized. Like this:

!GlobalLimit 2                                                                                                                                                           
!+- Window [row_number() windowspecdefinition(c#0 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#0], [c#0 DESC NULLS LAST] 
!   +- GlobalLimit 2                                                                                                                                                     
!      +- LocalLimit 2                                                                                                                                                   
!         +- Sort [c#0 DESC NULLS LAST], true                                                                                                                            
!            +- LocalRelation [a#0, b#0, c#0]                                                                                                                                                                                                                                                                                               

if child.maxRows.forall( _ > limitVal) =>
LocalLimit(
limitExpr = limitExpr,
child = window.copy(child = Limit(limitExpr, Sort(orderSpec, true, child))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why do we need an extra Sort here? Shouldn't physical plan rule EnsureRequirements add the sort between window and limit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Sort is needed because we need global sort.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If no partitionSpec specified, I think the planner inserts an exchange for making a single partition. So, we don't need a global sort here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current logic is sort first and then limit:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST]
   +- TakeOrderedAndProject(limit=5, orderBy=[a#10L ASC NULLS FIRST,b#11L ASC NULLS FIRST], output=[a#10L,b#11L])
      +- FileScan parquet default.t1[a#10L,b#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>

If remove sort. The logic is limit first and then sort:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST]
   +- Sort [a#10L ASC NULLS FIRST, b#11L ASC NULLS FIRST], false, 0
      +- GlobalLimit 5
         +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#30]
            +- LocalLimit 5
               +- FileScan parquet default.t1[a#10L,b#11L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/yumwang/opensource/spark/spark-warehouse/org.apache.spark...., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I got it. Thanks. I think its better to leave some comments about why we need it here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explanation @wangyum , +1 for adding some comments.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40198/

@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40198/

@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Test build #135619 has finished for PR 31691 at commit d1839b9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Mar 3, 2021

cc @cloud-fan


// Adding an extra Limit below WINDOW when there is only one RankLike/RowNumber
// window function and partitionSpec is empty.
case LocalLimit(limitExpr @ IntegerLiteral(limitVal),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we respect TOP_K_SORT_FALLBACK_THRESHOLD here?

@cloud-fan
Copy link
Contributor

cloud-fan commented Mar 3, 2021

I'm trying to understand the before/after data flow.

Before: input -> shuffle to one partition -> local sort and run rank function -> limit
After: input -> global top k (shuffle to one partition) -> run rank function -> limit

The optimization makes sense, but seems like we can remove the final limit?

More questions: why only allow a single rank function? what's the requirement of the window frame?

@wangyum
Copy link
Member Author

wangyum commented Mar 3, 2021

After: input -> global top k (shuffle to one partition) -> run rank function -> limit

EliminateLimits will remove the final limit.
Before:

== Optimized Logical Plan ==
GlobalLimit 5, Statistics(sizeInBytes=140.0 B, rowCount=5)
+- LocalLimit 5, Statistics(sizeInBytes=1731.0 B)
   +- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST], Statistics(sizeInBytes=1731.0 B)
      +- Relation default.t1[a#10L,b#11L] parquet, Statistics(sizeInBytes=1484.0 B)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- CollectLimit 5
   +- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST]
      +- Sort [a#10L ASC NULLS FIRST], false, 0
         +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#25]
            +- FileScan parquet default.t1[a#10L,b#11L] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>

After:

== Optimized Logical Plan ==
Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST], Statistics(sizeInBytes=140.0 B)
+- GlobalLimit 5, Statistics(sizeInBytes=120.0 B, rowCount=5)
   +- LocalLimit 5, Statistics(sizeInBytes=1484.0 B)
      +- Sort [a#10L ASC NULLS FIRST], true, Statistics(sizeInBytes=1484.0 B)
         +- Relation default.t1[a#10L,b#11L] parquet, Statistics(sizeInBytes=1484.0 B)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(a#10L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rowId#8], [a#10L ASC NULLS FIRST]
   +- TakeOrderedAndProject(limit=5, orderBy=[a#10L ASC NULLS FIRST], output=[a#10L,b#11L])
      +- FileScan parquet default.t1[a#10L,b#11L] Batched: true, DataFilters: [], Format: Parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>

@cloud-fan
Copy link
Contributor

If we know the final limit will always be removed, why we add it in the first place?

@wangyum
Copy link
Member Author

wangyum commented Mar 3, 2021

If we know the final limit will always be removed, why we add it in the first place?

Fixed by b328375. This needs a small change to EliminateLimits:
image

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40287/

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40287/

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Test build #135705 has finished for PR 31691 at commit b328375.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// Adding an extra Limit below WINDOW when there is only one RankLike/RowNumber
// window function and partitionSpec is empty.
case LocalLimit(limitExpr @ IntegerLiteral(limit),
window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why only allow one rank?

Copy link
Member Author

@wangyum wangyum Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Add support for multiple window functions if the partitionSpec of all window functions is empty and the same order is used. For example:

val numRows = 10
spark.range(numRows).selectExpr("IF (id % 2 = 0, null, id) AS a", s"${numRows} - id AS b", "id AS c").write.saveAsTable("t1")
spark.sql("SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn, DENSE_RANK() OVER(ORDER BY a) AS rk FROM t1 LIMIT 5").explain("cost")

Before:

GlobalLimit 5, Statistics(sizeInBytes=200.0 B, rowCount=5)
+- LocalLimit 5, Statistics(sizeInBytes=2.4 KiB)
   +- Window [row_number() windowspecdefinition(a#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#11, dense_rank(a#16L) windowspecdefinition(a#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#12], [a#16L ASC NULLS FIRST], Statistics(sizeInBytes=2.4 KiB)
      +- Relation default.t1[a#16L,b#17L,c#18L] parquet, Statistics(sizeInBytes=1994.0 B)

After:

Window [row_number() windowspecdefinition(a#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#11, dense_rank(a#16L) windowspecdefinition(a#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rk#12], [a#16L ASC NULLS FIRST], Statistics(sizeInBytes=200.0 B)
+- GlobalLimit 5, Statistics(sizeInBytes=160.0 B, rowCount=5)
   +- LocalLimit 5, Statistics(sizeInBytes=1994.0 B)
      +- Sort [a#16L ASC NULLS FIRST], true, Statistics(sizeInBytes=1994.0 B)
         +- Relation default.t1[a#16L,b#17L,c#18L] parquet, Statistics(sizeInBytes=1994.0 B)

case LocalLimit(limitExpr @ IntegerLiteral(limit),
window @ Window(Seq(Alias(WindowExpression(_: RankLike | _: RowNumber,
WindowSpecDefinition(Nil, orderSpec,
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _)), _, _, child))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the frame must be UnboundedPreceding, CurrentRow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this check.

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40328/

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40328/

@SparkQA
Copy link

SparkQA commented Mar 4, 2021

Test build #135746 has finished for PR 31691 at commit eb081c1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum wangyum marked this pull request as draft March 5, 2021 02:10
@SparkQA
Copy link

SparkQA commented Mar 8, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40434/

@SparkQA
Copy link

SparkQA commented Mar 8, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40434/

@SparkQA
Copy link

SparkQA commented Mar 8, 2021

Test build #135852 has finished for PR 31691 at commit ee3a782.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

private def isSupportPushdownThroughWindow(
windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall {
case Alias(WindowExpression(_: RankLike | _: RowNumberLike,
WindowSpecDefinition(Nil, _, _)), _) => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it really work with any kind of window frames? Can we add some comments to explain it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The window frame of RankLike and RowNumberLike is UNBOUNDED PRECEDING to CURRENT ROW.

override val frame: WindowFrame = SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)

@SparkQA
Copy link

SparkQA commented Mar 10, 2021

Test build #135938 has finished for PR 31691 at commit 3ec12ad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @wangyum , @cloud-fan , @maropu , @c21

For safety, if you don't mind, can we have an independent rule, LimitPushDownThroughWindow?

cc @gatorsmile

@wangyum
Copy link
Member Author

wangyum commented Mar 12, 2021

+1 for add an independent rule. It seems we can pushdown more cases. For example:

  1. Window function with PARTITION BY:
SELECT *, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn FROM t LIMIT 10 =>
SELECT *, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn FROM (SELECT * FROM t ORDER BY a, b LIMIT 10) tmp
  1. Window function with predicate:
SELECT * FROM (SELECT *, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn FROM t ) tmp where rn < 100 LIMIT 10 =>
SELECT *, ROW_NUMBER() OVER(PARTITION BY a ORDER BY b) AS rn FROM (SELECT * FROM t ORDER BY a, b LIMIT 10) tmp

@dongjoon-hyun
Copy link
Member

Thank you, @wangyum !

@c21
Copy link
Contributor

c21 commented Mar 12, 2021

@dongjoon-hyun if the point is to safely exclude rule through config if we find bug after next release, I am +1 for adding as a separate rule.

btw I think it'd good if we can rename the existing rule as well. Have two rules of LimitPushDown and LimitPushDownThroughWindow is kind of confusing for me. Maybe we can have LimitPushDownThroughUnionAndJoin and LimitPushDownThroughWindow. Similarly I feel same for RemoveNoopOperators and RemoveNoopUnion.

@dongjoon-hyun
Copy link
Member

Yes, definitely, that was my point.

if the point is to safely exclude rule through config if we find bug after next release, I am +1 for adding as a separate rule.

@dongjoon-hyun
Copy link
Member

BTW, for the renaming proposal, I'm not sure.

* Pushes down [[LocalLimit]] beneath WINDOW.
*/
object LimitPushDownThroughWindow extends Rule[LogicalPlan] {
// The window frame of RankLike and RowNumberLike is UNBOUNDED PRECEDING to CURRENT ROW.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is -> can only be

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably better to add an assert below to prove this comment.

val originalQuery = testRelation
.select(a, b, c,
windowExpr(RowNumber(), windowSpec(a :: Nil, c.desc :: Nil, windowFrame)).as("rn"))
.limit(20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test partitionSpec is not empty independently, we need .limit(2), don't we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

}

/**
* Pushes down [[LocalLimit]] beneath WINDOW.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add an itemized functionality and limitation here because LimitPushDownThroughWindow has the limited functionality and it's difficult to track by reading the code? It would be helpful when we add a new feature and maintain this optimizer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I plan explain it by SQL,

/**
 * Pushes down [[LocalLimit]] beneath WINDOW. This rule optimizes the following case:
 * {{{
 *   SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn FROM Tab1 LIMIT 5 ==>
 *   SELECT *, ROW_NUMBER() OVER(ORDER BY a) AS rn FROM (SELECT * FROM Tab1 ORDER BY a LIMIT 5) t
 * }}}
 */

private def supportsPushdownThroughWindow(
windowExpressions: Seq[NamedExpression]): Boolean = windowExpressions.forall {
case Alias(WindowExpression(_: RankLike | _: RowNumberLike,
WindowSpecDefinition(Nil, _,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, can we merge line 636 and 637?

case Alias(WindowExpression(_: RankLike | _: RowNumberLike, WindowSpecDefinition(Nil, _,
    SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))), _) => true

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, please put this new optimizer in a new file, @wangyum .

@SparkQA
Copy link

SparkQA commented Mar 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40680/

@SparkQA
Copy link

SparkQA commented Mar 16, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40680/

@cloud-fan
Copy link
Contributor

The scala 2.13 failure is unrelated, thanks, merging to master!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants